Skip to content

feat: Expose prediction SSE streams#3019

Open
markphelps wants to merge 15 commits into
mainfrom
cog-streaming-2-sse
Open

feat: Expose prediction SSE streams#3019
markphelps wants to merge 15 commits into
mainfrom
cog-streaming-2-sse

Conversation

@markphelps
Copy link
Copy Markdown
Collaborator

@markphelps markphelps commented May 14, 2026

Summary

  • Reuse POST /predictions and PUT /predictions/{id} for Server-Sent Events by returning an SSE stream when requests send Accept: text/event-stream.
  • Gate SSE behind the model author's explicit @streaming / @cog.streaming opt-in while preserving existing iterator JSON behavior for non-SSE requests.
  • Preserve existing non-SSE behavior: sync prediction requests still return the blocking JSON response, and Prefer: respond-async without SSE still returns 202 JSON.

Streaming behavior

  • Stream prediction lifecycle events: start, output, log, metric, and terminal completed events.
  • Replay retained events for clients that reconnect to an in-flight prediction with PUT /predictions/{id} and Accept: text/event-stream.
  • Bound replay history and live subscribers to avoid unbounded memory growth.
  • Add COG_STREAM_HISTORY_CAPACITY to configure per-prediction SSE replay history; 0 disables replay while keeping live streaming enabled.
  • Return 406 Not Acceptable for SSE requests when a model has not opted in, and for training endpoints which do not support SSE.

CLI behavior

  • This PR does not add a cog predict --stream option or otherwise change the predict CLI.
  • Predictions made through the Cog CLI continue to default to the existing non-streaming behavior.

Docs and tests

  • Document how to produce and consume streamed predictions in the HTTP docs.
  • Audit and expand docs/environment.md for public Cog-specific environment variables, including the new SSE replay history setting.
  • Add Rust unit coverage for replay history defaults, configured capacity, invalid values, disabled replay, terminal SSE events, subscriber caps, and concurrent subscriber cancellation semantics.
  • Add txtar integration coverage for configurable stream history, disabled replay, and custom metrics over SSE.

Copy link
Copy Markdown
Contributor

@ask-bonk ask-bonk Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm Bonk, and I've done a quick review of your PR.

This PR adds Server-Sent Events (SSE) streaming for predictions with event replay for late subscribers. The implementation looks solid overall with good test coverage. I found one potential resource concern that should be addressed:

Issues

1. Unbounded memory growth in stream_history (Medium)

The stream_history Vec in Prediction grows unbounded during prediction execution. For long-running predictions with frequent output chunks, logs, or metrics, this could cause memory exhaustion. Consider adding a limit to the history size or making it configurable.

Comment thread crates/coglet/src/prediction.rs Outdated
@ask-bonk
Copy link
Copy Markdown
Contributor

ask-bonk Bot commented May 14, 2026

I've completed my review of PR #3019. The PR adds SSE streaming for predictions with event replay functionality. The implementation is well-structured and includes comprehensive tests.

Summary of findings:

I identified one medium-severity issue regarding unbounded memory growth in the stream_history Vec. For long-running predictions that generate many output chunks, logs, or metrics, this could lead to memory exhaustion. I've posted a review comment suggesting either adding a configurable limit to the history size or implementing a mechanism to drop old events when a threshold is exceeded.

The rest of the implementation looks solid:

  • Good use of tokio::sync::broadcast for the streaming channel
  • Proper handling of late subscribers via replay
  • Correct cancellation semantics for sync vs async predictions
  • Comprehensive test coverage including integration tests
  • Clean SSE stream implementation with proper event formatting

github run

@ask-bonk
Copy link
Copy Markdown
Contributor

ask-bonk Bot commented May 14, 2026

Based on my review of PR #3019, I agree with the previous reviewer's finding about unbounded memory growth in stream_history. However, since that issue was already flagged, I have no additional actionable issues to report.

The implementation is well-structured:

  • Clean SSE stream implementation with proper event replay for late subscribers
  • Correct handling of sync vs async prediction cancellation semantics
  • Comprehensive test coverage including integration tests
  • Good documentation with working examples
  • Proper error handling for broadcast channel lag cases

LGTM

github run

Base automatically changed from cog-streaming-1 to main May 14, 2026 19:32
@markphelps markphelps force-pushed the cog-streaming-2-sse branch from cfa865e to c61e323 Compare May 14, 2026 19:38
@ask-bonk
Copy link
Copy Markdown
Contributor

ask-bonk Bot commented May 14, 2026

Based on my thorough review of PR #3019, I have analyzed all the changed files:

Summary of changes:
This PR adds Server-Sent Events (SSE) streaming for predictions with event replay for late subscribers. It exposes GET /predictions/{id}/stream as an SSE endpoint that streams lifecycle events (start, output, log, metric, completed) in real-time.

Previous findings:
The unbounded memory growth issue in stream_history was already flagged by the previous reviewer. This is a legitimate concern where long-running predictions with frequent output chunks could cause memory exhaustion.

My independent assessment:
I've verified the following aspects of the implementation:

  1. Broadcast channel - Uses a 1024-slot channel with proper lag handling (logs warning and continues)
  2. Stream cancellation semantics - Correctly distinguishes between sync (cancel on disconnect) and async (no cancel) predictions via PredictionStreamGuard::drop()
  3. Drop order - Verified that field drop order in StreamState ensures the receiver is still alive when the guard's Drop runs
  4. Event replay - Properly replays history for late subscribers before subscribing to live events
  5. Keep-alive - SSE stream includes 15-second keep-alive messages
  6. Integration test - Comprehensive test covering the full SSE flow
  7. Example - Working example with Hugging Face transformers model

No additional actionable issues beyond what was already flagged.

LGTM

github run

@ask-bonk
Copy link
Copy Markdown
Contributor

ask-bonk Bot commented May 14, 2026

After reviewing PR #3019, I've analyzed the changes to add Server-Sent Events (SSE) streaming for predictions. The implementation reuses POST /predictions and PUT /predictions/{id} with content negotiation via Accept: text/event-stream, removing the separate GET /predictions/{id}/stream route.

Key aspects reviewed:

  • Clean SSE stream implementation with 15s keep-alive and proper event formatting
  • Correct cancellation semantics distinguishing sync vs async predictions via PredictionStreamGuard
  • Event replay for late subscribers using stream_history Vec
  • Comprehensive test coverage including unit tests and integration tests
  • Good documentation with working examples
  • Preserved backward compatibility for non-SSE requests

Previously flagged issue:
The unbounded memory growth in stream_history Vec was already identified by a previous reviewer. This is a legitimate concern for long-running predictions with frequent events.

No additional actionable issues found beyond what was already flagged.

LGTM

github run

@ask-bonk
Copy link
Copy Markdown
Contributor

ask-bonk Bot commented May 14, 2026

Based on my thorough review of PR #3019, I can see this PR adds Server-Sent Events (SSE) streaming for predictions by reusing POST /predictions and PUT /predictions/{id} with content negotiation via Accept: text/event-stream.

Key changes reviewed:

  • prediction.rs: Bounded stream history (1024 events) with proper event replay for late subscribers
  • service.rs: PredictionStreamGuard correctly distinguishes sync vs async prediction cancellation semantics
  • routes.rs: Clean SSE stream implementation with 15s keep-alive, content negotiation, removed separate /stream route
  • Tests: Comprehensive unit tests and integration test coverage

Previously flagged issue (addressed):
The unbounded memory growth concern in stream_history has been resolved with MAX_STREAM_HISTORY_EVENTS: usize = 1024 (line 13), and the emit_stream_event method properly removes old events when the limit is reached (lines 195-201).

Additional verification:

  • Terminal state guards prevent overwriting terminal states
  • Drop order in StreamState ensures receiver is alive when guard's Drop runs
  • Sync predictions cancel on disconnect; async predictions do not
  • Backward compatibility preserved for non-SSE requests

LGTM

github run

@ask-bonk
Copy link
Copy Markdown
Contributor

ask-bonk Bot commented May 14, 2026

Based on my review of PR #3019, I can confirm that the previously flagged issue regarding unbounded memory growth in stream_history has been addressed:

Verified fixes:

  • MAX_STREAM_HISTORY_EVENTS: usize = 1024 is defined at line 13 in prediction.rs
  • The emit_stream_event method (lines 199-206) correctly removes old events when the limit is reached and tracks skipped events

Additional verification:

  • Bounded replay test exists at line 727 in prediction.rs
  • Terminal state guards prevent overwriting terminal states (lines 230-231, 249-250, 262-263)
  • PredictionStreamGuard::drop() correctly distinguishes sync vs async cancellation (lines 141-153 in service.rs)
  • Proper drop order in StreamState ensures receiver stays alive during guard drop

The implementation is well-structured with comprehensive test coverage including unit tests for lagged streams, truncated replay, and integration tests.

LGTM

github run

@markphelps markphelps marked this pull request as ready for review May 14, 2026 21:37
@markphelps markphelps requested a review from a team as a code owner May 14, 2026 21:37
@markphelps markphelps requested a review from michaeldwan May 14, 2026 21:37
@markphelps markphelps changed the title Expose prediction SSE streams feat: Expose prediction SSE streams May 14, 2026
Copy link
Copy Markdown
Member

@michaeldwan michaeldwan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solid design. Main point discussed offline: streaming needs to be opt-in at the handler level via a decorator/attribute. Five concrete changes in the comments.

Comment thread crates/coglet/src/prediction.rs Outdated

fn emit_stream_event(&mut self, event: PredictionStreamEvent) {
if self.stream_history.len() == MAX_STREAM_HISTORY_EVENTS {
self.stream_history.remove(0);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vec::remove(0) is O(n) on every event after the buffer fills -- thousands of shifts per prediction for token-by-token LLM output. VecDeque::pop_front() is O(1).

prediction_id.clone(),
input.clone(),
webhook_sender,
response_mode != PredictionResponseMode::AsyncJson,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

response_mode != PredictionResponseMode::AsyncJson is true for SyncJson too. Should be response_mode == PredictionResponseMode::AsyncSse -- the current expression is inert for sync mode but reads as a bug.

Comment thread crates/coglet/src/prediction.rs Outdated

impl Prediction {
pub fn new(id: String, webhook: Option<WebhookSender>) -> Self {
let (stream_tx, _) = tokio::sync::broadcast::channel(1024);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Channel capacity and history cap are both 1024 but defined independently. Use a shared constant so they can't drift.

return;
}

if self.service.stream_receiver_count(&self.id) == 0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The async cleanup task calls remove_prediction while the SSE client may still be draining. After removal, stream_receiver_count returns unwrap_or(0) and prediction_is_terminal returns unwrap_or(true) -- which happen to be the safe defaults. That invariant needs a comment, or hold an Arc to the entry in the guard so it doesn't depend on post-removal lookup.

cog serve --upload-url http://unused/

curl -H Accept:text/event-stream PUT /predictions/sse-stream-test '{"id":"sse-stream-test","input":{}}'
stdout 'event: output'
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing stdout 'event: start' -- should verify the full lifecycle.

Copy link
Copy Markdown
Contributor

@ask-bonk ask-bonk Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm Bonk, and I've done a quick review of your PR.

This PR adds Server-Sent Events (SSE) streaming for predictions with event replay for late subscribers. The implementation is well-structured overall.

Summary

I've verified the issues flagged by previous reviewers:

  1. Vec::remove(0) O(n) issue - Fixed: Now uses VecDeque::pop_front() which is O(1)
  2. Unbounded memory growth - Fixed: STREAM_EVENT_BUFFER_CAPACITY (1024) limits both broadcast channel and history
  3. Integration test coverage - The test at integration-tests/tests/sse_streaming_output.txtar includes stdout 'event: start' as suggested

The implementation correctly:

  • Distinguishes sync vs async cancellation semantics via PredictionStreamGuard
  • Handles late subscribers with bounded replay (1024 events)
  • Includes 15s keep-alive for SSE streams
  • Has comprehensive test coverage including lagged stream and truncated replay scenarios

LGTM - all previously flagged issues have been addressed.

@ask-bonk
Copy link
Copy Markdown
Contributor

ask-bonk Bot commented May 18, 2026

Posted review for PR #3019. All previously flagged issues have been addressed:

  1. O(n) removal fixed - Uses VecDeque::pop_front() instead of Vec::remove(0)
  2. Memory bounded - STREAM_EVENT_BUFFER_CAPACITY (1024) limits both channel and history
  3. Integration test complete - Includes stdout 'event: start' verification
  4. Constant shared - Channel capacity and history cap use the same constant
  5. Race handled - Guard correctly handles missing entries during cleanup

github run

@ask-bonk
Copy link
Copy Markdown
Contributor

ask-bonk Bot commented May 18, 2026

Based on my review of PR #3019, I've verified that all previously flagged issues have been addressed:

Verified fixes:

  1. O(n) removal - Uses VecDeque::pop_front() instead of Vec::remove(0) (prediction.rs:201)
  2. Memory bounded - STREAM_EVENT_BUFFER_CAPACITY (1024) limits both channel and history (prediction.rs:13,155,200)
  3. Shared constant - Channel capacity and history cap use the same constant
  4. Race handled - Guard correctly handles missing entries during cleanup with documented safe defaults (service.rs:147-154)
  5. Integration test - Includes stdout 'event: start' verification (sse_streaming_output.txtar:8)

Implementation highlights:

  • Clean SSE stream implementation with content negotiation via Accept: text/event-stream
  • Proper cancellation semantics distinguishing sync vs async predictions via PredictionStreamGuard
  • Event replay for late subscribers with bounded history (1024 events)
  • 15s keep-alive for SSE streams
  • Comprehensive test coverage including lagged stream and truncated replay scenarios

LGTM

github run

@ask-bonk
Copy link
Copy Markdown
Contributor

ask-bonk Bot commented May 18, 2026

LGTM

github run

@ask-bonk
Copy link
Copy Markdown
Contributor

ask-bonk Bot commented May 18, 2026

Based on my thorough review of PR #3019, I've verified that all previously flagged issues have been addressed:

Verified fixes:

  1. O(n) removal - Uses VecDeque::pop_front() (prediction.rs:201) instead of Vec::remove(0)
  2. Memory bounded - STREAM_EVENT_BUFFER_CAPACITY (1024) limits both channel and history (prediction.rs:13,155,200)
  3. Shared constant - Channel capacity and history cap use the same constant
  4. Race handled - Guard correctly handles missing entries during cleanup with documented safe defaults (service.rs:147-154)
  5. Integration test - Includes stdout 'event: start' verification (sse_streaming_output.txtar:8)

Implementation highlights:

  • Clean SSE stream implementation with content negotiation via Accept: text/event-stream
  • Proper cancellation semantics distinguishing sync vs async predictions via PredictionStreamGuard
  • Event replay for late subscribers with bounded history (1024 events)
  • 15s keep-alive for SSE streams
  • Comprehensive test coverage including lagged stream and truncated replay scenarios

LGTM

github run

@markphelps
Copy link
Copy Markdown
Collaborator Author

@michaeldwan decorator added in 8c9c982

@markphelps markphelps requested a review from michaeldwan May 18, 2026 20:26
Copy link
Copy Markdown
Member

@michaeldwan michaeldwan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@michaeldwan michaeldwan self-requested a review May 19, 2026 20:21
Copy link
Copy Markdown
Member

@michaeldwan michaeldwan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary

Solid design. The broadcast channel + bounded replay architecture is sound, the opt-in gating via @cog.streaming is the right call, and the integration between Go parser → OpenAPI extension → coglet runtime is clean. The replay/live transition has no race (mutex held atomically), the bounded history buffer has no off-by-one, and completed events are emitted on all terminal paths.

One blocker, several should-fixes. Details below and inline.

Blockers

  • examples/streaming-text/predict.py doesn't use @cog.streaming -- the README tells users to curl with Accept: text/event-stream, which will 406. The integration test sse_streaming_output.txtar correctly uses @streaming, but the shipped example doesn't. (See inline comment.)

  • No test for SSE events during failed or cancelled predictions. set_failed() and set_canceled() both emit Completed events, but no test verifies the SSE stream delivers event: completed with "status":"failed" / "status":"canceled". These are critical user-facing paths.

Should-fix

  • decoratorIsCogStreaming hard-codes "cog.streaming" instead of resolving through ImportContext. import cog as c@c.streaming won't be detected. The rest of the parser handles aliases. (See inline comment.)
  • @cog.streaming() with parens silently degrades. Parser rejects call form, but Python decorator works either way. User gets a working model where SSE returns 406 with no hint about the parens.
  • No limit on SSE subscriber count per prediction. subscribe_prediction_stream() creates a new broadcast receiver with no cap. Repeated SSE connections to the same prediction ID amplify memory pressure.
  • Orphaned pending_cancellations leak memory. Cancel messages arriving after a prediction completes get stored in the HashSet and never cleaned up. (See inline comment.)
  • Double-clone on every stream event. emit_stream_event clones serde_json::Value for history storage. Arc<PredictionStreamEvent> would eliminate deep clones -- history and broadcast share the same allocation. Also fixes the O(n) deep-clone in subscribe_stream_replay() under the mutex. (See inline comment.)
  • PredictionStreamGuard::Drop calls tokio::spawn via cancel(). If dropped outside a tokio runtime context, tokio::spawn panics. Use Handle::try_current().
  • No test for concurrent SSE subscribers. The guard checks stream_receiver_count() == 0 before cancelling, but no test verifies dropping one of two subscribers doesn't cancel.
  • Training endpoints silently ignore Accept: text/event-stream. Returning 406 or documenting would be more honest than silent fallback to JSON.

Nits

  • RegisterPredictionMessage 4-element tuple → named struct.
  • streaming listed under # Metrics in __all__ -- it's a decorator.
  • Module-level F TypeVar → _F to signal internal.
  • replay.into() creates unnecessary VecDeque from Vec.
  • id.to_string() allocated twice in subscribe_prediction_stream.
  • Missing require.NotNil guard before type assertions in streaming OpenAPI tests.
  • Broadcast channel capacity and history buffer both use the same 1024 constant by coincidence -- give them separate named constants.

Verified correct

  • Replay + live transition (no race -- mutex held atomically during subscribe + snapshot)
  • Bounded history buffer (no off-by-one)
  • completed event emitted on all terminal paths
  • Terminal state guards prevent double-completion
  • findTargetFunction returning decorated_definition handled correctly via UnwrapFunction
  • cog predict CLI works fine with streaming models (uses sync JSON path)
  • docs/python.md and docs/llms.txt are accurate and in sync

),
) -> Iterator[str]:
messages = [{"role": "user", "content": prompt}]
text = self.tokenizer.apply_chat_template(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocker: This method is missing @cog.streaming. The README (line 28-31) tells users to curl with Accept: text/event-stream, which will return 406 since the model doesn't opt in.

The integration test sse_streaming_output.txtar correctly uses @streaming, but this shipped example doesn't.

from cog import BasePredictor, Input, streaming

class Predictor(BasePredictor):
    # ...
    @streaming
    def predict(self, ...) -> Iterator[str]:

Comment thread crates/coglet/src/prediction.rs Outdated
self.stream_history.pop_front();
self.stream_history_skipped += 1;
}
self.stream_history.push_back(event.clone());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should-fix: This clones the event (containing serde_json::Value) for history, then moves the original into broadcast::send. For high-throughput models yielding many chunks, this deep-clones arbitrarily large JSON on every output.

Consider Arc<PredictionStreamEvent> for the broadcast channel type -- history and broadcast share the same allocation, and subscribe_stream_replay() becomes 1024 atomic increments instead of 1024 deep JSON clones under the prediction mutex.

stream_tx: broadcast::Sender<Arc<PredictionStreamEvent>>,
stream_history: VecDeque<Arc<PredictionStreamEvent>>,

fn emit_stream_event(&mut self, event: PredictionStreamEvent) {
    // ...
    let event = Arc::new(event);
    self.stream_history.push_back(Arc::clone(&event));
    let _ = self.stream_tx.send(event);
}

Comment thread crates/coglet/src/orchestrator.rs Outdated
None => {
tracing::debug!(%prediction_id, "Cancel requested for unknown prediction (may have already completed)");
tracing::debug!(%prediction_id, "Cancel requested for unknown prediction; storing pending cancellation");
pending_cancellations.insert(prediction_id);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should-fix: If the cancel arrives after the prediction has already completed and been removed from predictions, the ID is stored here and never consumed. In a long-running server with many cancelled predictions, this is an unbounded leak.

Consider adding a size cap (e.g., 1000 entries) or a TTL, and log a warning when it's exceeded.

func decoratorIsCogStreaming(node *sitter.Node, source []byte, imports *schema.ImportContext) bool {
for _, child := range NamedChildren(node) {
switch child.Type() {
case "attribute":
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should-fix: Hard-coded string match. import cog as c then @c.streaming won't be detected -- the content will be "c.streaming", not "cog.streaming".

The rest of the parser resolves aliases through ImportContext (e.g., IsBaseModel, IsOpaque). This should do the same:

case "attribute":
    text := Content(child, source)
    parts := strings.SplitN(text, ".", 2)
    if len(parts) != 2 || parts[1] != "streaming" {
        return false
    }
    entry, ok := imports.Names.Get(parts[0])
    return ok && entry.Module == "cog" && entry.Original == "cog"

Comment thread pkg/schema/python/parser.go Outdated
entry, ok := imports.Names.Get("streaming")
return ok && entry.Module == "cog" && entry.Original == "streaming"
case "call":
return false
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should-fix: This rejects @cog.streaming() (call form), but the Python decorator works fine with either @streaming or @streaming(). A user who writes @cog.streaming() gets a model that builds, runs, and yields output -- but SSE returns 406 with no hint about the parentheses.

Either support the call form here (check if the callee is cog.streaming or imported streaming), or make the Python decorator raise a clear error when called with parens. The current behavior is a silent gotcha.

Comment thread crates/coglet/src/service.rs Outdated
self: &Arc<Self>,
id: &str,
) -> Option<PredictionStreamSubscription> {
let entry = self.predictions.get(id)?;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should-fix: No cap on subscriber count. The idempotent PUT endpoint allows repeated SSE connections to the same prediction ID, each creating a new broadcast receiver. An attacker opening many connections forces the sender to retain events for slow consumers.

Consider:

let pred = entry.prediction.lock().ok()?;
if pred.stream_receiver_count() >= MAX_STREAM_SUBSCRIBERS {
    return None;
}

// Prediction cleanup may remove the service entry before the SSE response
// finishes draining. Missing entries deliberately report zero receivers and
// terminal state so this guard cannot cancel an already-cleaned prediction.
if self.service.stream_receiver_count(&self.id) == 0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should-fix: cancel() internally calls tokio::spawn. If this guard is dropped outside a tokio runtime context (panic unwinding, runtime shutdown), tokio::spawn panics.

Safer:

if let Ok(handle) = tokio::runtime::Handle::try_current() {
    let service = Arc::clone(&self.service);
    let id = self.id.clone();
    handle.spawn(async move { service.cancel(&id); });
}

Or refactor cancel() itself to use Handle::try_current() before spawning.

Comment thread python/cog/__init__.py
"CancelationException",
# Metrics
"current_scope",
"streaming",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: streaming is a decorator, not a metric. Move it to its own # Decorators section or under # Core classes.

Comment thread python/cog/__init__.py Outdated
URLPath,
)

F = TypeVar("F", bound=Callable[..., object])
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This is importable as from cog import F. Prefix with underscore (_F) to signal it's internal.

Comment thread crates/coglet/src/orchestrator.rs Outdated
}

type RegisterPredictionMessage = (
SlotId,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: A 4-element tuple type alias is opaque -- the ack channel's purpose is invisible at usage sites. A named struct would be clearer:

struct RegisterPredictionMessage {
    slot_id: SlotId,
    prediction: Arc<StdMutex<Prediction>>,
    idle_sender: oneshot::Sender<SlotIdleToken>,
    registered_ack: oneshot::Sender<()>,
}

@markphelps
Copy link
Copy Markdown
Collaborator Author

@copilot resolve the merge conflicts in this pull request

Co-authored-by: markphelps <209477+markphelps@users.noreply.github.com>
Copy link
Copy Markdown
Contributor

Copilot AI commented May 20, 2026

@copilot resolve the merge conflicts in this pull request

Resolved and pushed in 409776b. I merged main into this branch and fixed the conflicts in pkg/schema/python/parser.go, docs/python.md, and docs/llms.txt.

Comment thread python/cog/__init__.py Fixed
Comment thread python/cog/__init__.py Fixed
@ask-bonk
Copy link
Copy Markdown
Contributor

ask-bonk Bot commented May 20, 2026

@markphelps Bonk workflow was cancelled.

View workflow run · To retry, trigger Bonk again.

@markphelps
Copy link
Copy Markdown
Collaborator Author

/bonk review latest commits

@markphelps markphelps requested a review from michaeldwan May 20, 2026 16:08
@ask-bonk
Copy link
Copy Markdown
Contributor

ask-bonk Bot commented May 20, 2026

Based on my thorough review of the latest commits in PR #3019, here's my assessment:

Summary

All previously flagged review issues have been addressed. The implementation is solid and well-tested.

Verified Fixes from Previous Reviews

Blockers (all resolved):

  • ✅ Example file uses @streaming decorator (examples/streaming-text/predict.py:24)
  • ✅ Tests for SSE events during failed/cancelled predictions (routes.rs:1340,1407)
  • ✅ Import alias support via attributeIsCogStreaming (parser.go:720-727)
  • ✅ Call form support @cog.streaming() (parser.go:704-714)

Should-fix items (all resolved):

  • ✅ Bounded memory: Uses VecDeque::pop_front() (O(1)) (prediction.rs:206)
  • ✅ Shared constant: STREAM_CHANNEL_CAPACITY / STREAM_HISTORY_CAPACITY (prediction.rs:13-14)
  • ✅ Subscriber cap: MAX_STREAM_SUBSCRIBERS enforced (service.rs:56,567-570)
  • ✅ Race handling: Documented safe defaults in guard drop (service.rs:160-167)
  • ✅ Concurrent subscribers test: dropping_one_of_two_sync_stream_subscriptions (service.rs:1273)
  • ✅ Named struct: RegisterPredictionMessage (orchestrator.rs:358-363)
  • ✅ Tokio runtime check: Handle::try_current() in spawn_orchestrator_cancel (service.rs:793-797)
  • ✅ Pending cancellations cap: MAX_PENDING_CANCELLATIONS (orchestrator.rs:32,703-713)

Nits (all resolved):

  • streaming listed under # Decorators in __all__ (python/cog/init.py:162)
  • _F TypeVar prefixed with underscore (python/cog/init.py:43)

Notable Implementation Details

  1. Clean architecture: Broadcast channel + bounded replay (1024 events) with proper event replay for late subscribers
  2. Correct cancellation semantics: Sync predictions cancel on disconnect; async predictions don't
  3. Comprehensive test coverage: Unit tests for lagged streams, bounded replay, concurrent subscribers, and integration tests
  4. Performance optimized: Uses Arc<PredictionStreamEvent> to avoid deep clones

Minor Note

The github-code-quality bot flagged the ... syntax in Python overload stubs. This is standard Python typing syntax for overload definitions and is correct.


LGTM - This PR is ready to merge. All review feedback has been addressed comprehensively.

github run

@bghira
Copy link
Copy Markdown
Contributor

bghira commented May 20, 2026

i'm interested in working on WebSocket support after this lands, so that we can have a slightly-more-efficient means to stream responses in and out of the container (with not much idea of the limits of this until i actually begin it).

Copy link
Copy Markdown
Member

@michaeldwan michaeldwan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous review feedback is fully addressed — every blocker and should-fix from 2026-05-19 is in with regression tests. The architecture (broadcast + bounded replay + Arc<PredictionStreamEvent> + receiver-before-guard drop order) holds up under a fresh pass.

One regression to fix before merge: a decorated train() method in a class now fails to parse (inline). The should-fixes below are real but none are correctness-critical for the SSE happy path.

Blocker

  • findMethodInClass returns decorated_definition — breaks decorated train methods. Reproduced locally with ErrParse: function has no parameters node. Inline.

Should-fix

  • SSE subscription failure leaves prediction running unobserved (inline on routes.rs)
  • Drop-order invariant in StreamState / PredictionStreamSubscription is load-bearing but undocumented (inline)
  • No test for train mode + SupportsStreaming=trueopenapi.go:220 guards with !isTrain && SupportsStreaming, no coverage for the omit path (inline on openapi_test.go)
  • @streaming on a non-iterator return type silently produces broken SSE — the parser only inspects the decorator, never the return annotation (inline on parser.go)
  • examples/streaming-text/predict.py uses deprecated BasePredictor / predict() even though docs/python.md directs new code to BaseRunner / run() (inline)
  • __cog_streaming__ attribute is dead code — nothing reads it, but two unit tests assert it (inline)

Nits

  • PredictionService::cancel silently drops worker-side cancel on try_read() contention (inline)
  • subscribe_prediction_stream maps poisoned mutex to NotFound → misleading 404 (inline)
  • Aliased from cog import streaming as stream then @stream is not detected; attribute form already handles aliasing — asymmetric (inline)
  • Example's thread.join() is unreachable on early consumer exit → GPU memory leak in the copy-paste case (inline)
  • Docs list four spellings as if the user has a meaningful choice (inline on docs/python.md)

Comment thread pkg/schema/python/parser.go Outdated
nameNode := funcNode.ChildByFieldName("name")
if nameNode != nil && Content(nameNode, source) == methodName {
return funcNode, nil
return child, nil
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocker: Returning child here breaks decorated train() methods in classes. ParsePredictor (line 80) takes target.node directly without unwrapping, then line 88 calls funcNode.ChildByFieldName("parameters") — that field exists on function_definition, not decorated_definition, so any decorated train method fails to parse.

Reproduced locally:

class Trainer(BasePredictor):
    @functools.wraps(lambda x: x)
    def train(self, n: int) -> Path: ...

ErrParse: function has no parameters node.

functionSupportsStreaming already handles a function_definition argument by walking node.Parent() (parser.go:675-681), so returning funcNode here is correct and streaming detection still works.

Fix:

return funcNode, nil

Plus a regression test for a decorated train method — that gap is why CI missed this.

} else {
None
};

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should-fix: The predict task is spawned before checking whether subscribe_prediction_stream returned Ok. If subscription fails (TooManySubscribers on burst, NotFound on a cleanup race), the prediction still runs to completion, fires webhooks, consumes a slot, and is cleaned up with no client watching.

Either subscribe before spawn and gate the spawn on Ok, or cancel-and-await on subscription failure.

skipped: u64,
receiver: tokio::sync::broadcast::Receiver<SharedPredictionStreamEvent>,
guard: PredictionStreamGuard,
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should-fix: Drop order is load-bearing here: receiver must drop before guard, otherwise stream_receiver_count(&id) in the guard's Drop still sees this receiver (≥1) and cancel_on_stream_drop never fires. A future field reorder silently breaks SSE cancel-on-disconnect.

The dropping_one_of_two_sync_stream_subscriptions test would catch it, but a comment is what protects refactors that don't run the full suite:

// IMPORTANT: drop order matters — receiver must drop before guard so
// stream_receiver_count() returns 0 by the time the guard runs cleanup.

Same comment belongs on StreamState in routes.rs:681-687.

require.NotNil(t, postPath)
post := postPath.(map[string]any)
assert.Equal(t, true, post["x-cog-streaming"])
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should-fix: openapi.go:220 guards x-cog-streaming with !isTrain && info.SupportsStreaming, but no test exercises isTrain=true && SupportsStreaming=true to confirm the extension is omitted on /trainings. Easy to regress if someone refactors the guard. Mirror this test with Mode: ModeTrain and assert the extension is absent.

return nil
}

func functionSupportsStreaming(node *sitter.Node, source []byte, imports *schema.ImportContext) bool {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should-fix: functionSupportsStreaming only inspects the decorator, never the return type. A user who applies @streaming to def predict(self) -> str gets a schema claiming streaming support; SSE clients receive a single chunk + completed and conclude the model is broken.

Enforce here: require an Iterator[...] / AsyncIterator[...] / *ConcatenateIterator[...] return annotation before reporting SupportsStreaming = true. The return type is already parsed at line 108-115 — this is the natural enforcement point. Document the constraint in docs/python.md.

);
}
});
spawn_orchestrator_cancel(orch, id_owned);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: If self.orchestrator.try_read() can't acquire the lock, the worker-side cancel is silently dropped — the CancellationToken fires but the subprocess never gets ControlRequest::Cancel. Today fine (only one .write() at startup), but a tracing::warn!(prediction_id = %id, "Skipped worker cancel: orchestrator lock unavailable") would make any future regression diagnosable.

Comment thread crates/coglet/src/service.rs Outdated
let prediction = entry
.prediction
.lock()
.map_err(|_| SubscribePredictionStreamError::NotFound)?;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Poisoned mutex maps to SubscribePredictionStreamError::NotFound → returns 404. A 404 for a poisoned mutex is misleading at the diagnostic level. Either use try_lock_prediction (which attempts recovery and fails the prediction explicitly) or add a dedicated error variant.

return false
}
entry, ok := imports.Names.Get("streaming")
return ok && entry.Module == "cog" && entry.Original == "streaming"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: from cog import streaming as stream then @stream is not detected — this rejects anything where the local identifier isn't literally "streaming". The attribute form (line 720-727) already handles aliasing via entry.Original == "cog", so the asymmetry is mildly surprising.

One-line fix mirrors the attribute form:

func identifierIsCogStreaming(node *sitter.Node, source []byte, imports *schema.ImportContext) bool {
    entry, ok := imports.Names.Get(Content(node, source))
    return ok && entry.Module == "cog" && entry.Original == "streaming"
}

Comment thread examples/streaming-text/predict.py Outdated
if chunk:
yield chunk

thread.join()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: thread.join() is unreachable if the consumer of the Iterator[str] (the coglet worker) stops iterating early — e.g. prediction cancellation. The for chunk in streamer: loop exits via GeneratorExit, the join is skipped, and the generation thread keeps running until max_new_tokens completes, holding GPU memory.

Best practice for an example users will copy:

try:
    for chunk in streamer:
        if chunk:
            yield chunk
finally:
    thread.join()

(or a comment noting the limitation.)

Comment thread docs/python.md Outdated

To support streaming output in your Cog model, add `from typing import Iterator` to your `run.py` file. The `typing` package is a part of Python's standard library so it doesn't need to be installed. Then add a return type annotation to the `run()` method in the form `-> Iterator[<type>]` where `<type>` can be one of `str`, `int`, `float`, `bool`, or `cog.Path`.

To allow clients to receive chunks as server-sent events with `Accept: text/event-stream`, decorate the prediction method (`run()` or `predict()`) with `@cog.streaming`, `@cog.streaming()`, `@streaming`, or `@streaming()` imported from `cog`. Without the decorator, iterator outputs still work in normal JSON responses, but SSE requests return `406 Not Acceptable`.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Listing all four spellings (@cog.streaming, @cog.streaming(), @streaming, @streaming()) reads like the user has a meaningful choice between four things. Consider: "decorate with @cog.streaming (or @streaming if imported directly); the parenthesized form @streaming() is also accepted." Same change in docs/llms.txt:1982.

@markphelps
Copy link
Copy Markdown
Collaborator Author

@copilot resolve the merge conflicts in this pull request

Co-authored-by: markphelps <209477+markphelps@users.noreply.github.com>
Copy link
Copy Markdown
Contributor

Copilot AI commented May 21, 2026

@copilot resolve the merge conflicts in this pull request

Resolved and pushed in 6bc69a6. I merged main into this branch and fixed the merge conflicts in docs/environment.md and docs/llms.txt.

@ask-bonk
Copy link
Copy Markdown
Contributor

ask-bonk Bot commented May 21, 2026

Based on my thorough review of PR #3019, I can see that all the previously flagged issues from the review history have been comprehensively addressed:

Summary of Verified Fixes

All blockers and should-fix items from michaeldwan's 2026-05-19 review are resolved:

  1. Decorated train methods - findMethodInClass returns funcNode (unwrapped function), not child
  2. SSE subscription ordering - Subscription happens before spawning prediction task (routes.rs:497-519)
  3. Drop-order invariant - Documented in StreamState (routes.rs:687-688)
  4. Train mode + SupportsStreaming test - TestTrainingOperationOmitsStreamingExtensionWhenEnabled exists (openapi_test.go:692-707)
  5. @streaming on non-iterator return type - Returns error in ParsePredictor (parser.go:119-122)
  6. Example uses BaseRunner - examples/streaming-text/predict.py uses BaseRunner/run() not deprecated BasePredictor
  7. cog_streaming dead code - Attribute removed; streaming detection is purely AST-based in Go parser
  8. O(n) removal - Uses VecDeque::pop_front() (prediction.rs:212)
  9. Bounded memory - STREAM_CHANNEL_CAPACITY and configurable history capacity
  10. Arc for events - SharedPredictionStreamEvent type alias used throughout
  11. Pending cancellations cap - MAX_PENDING_CANCELLATIONS (1000) with overflow handling
  12. Named struct - RegisterPredictionMessage is a named struct (orchestrator.rs:358-363)
  13. Tokio runtime check - Handle::try_current() in spawn_orchestrator_cancel
  14. Subscriber cap - MAX_STREAM_SUBSCRIBERS enforced
  15. Concurrent subscribers test - dropping_one_of_two_sync_stream_subscriptions_does_not_cancel_prediction
  16. Failed/cancelled SSE tests - Both failed_prediction_sse_stream_emits_completed_event and canceled_prediction_sse_stream_emits_completed_event exist

The implementation is solid with comprehensive test coverage and all review feedback has been addressed.

LGTM

github run

@markphelps
Copy link
Copy Markdown
Collaborator Author

please @michaeldwan

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants